/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.cloud.api.collections;

import static org.hamcrest.Matchers.containsString;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.solr.client.solrj.RemoteSolrException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.apache.HttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.SolrQuery;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.embedded.JettySolrRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Tests the Cloud Collections API. */
public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {

  private static final int MAX_TIMEOUT_SECONDS = 90;

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @Before
  public void setupCluster() throws Exception {
    // we recreate per test - they need to be isolated to be solid
    configureCluster(2).addConfig("conf1", configset("cloud-minimal")).configure();
  }

  @Override
  @After
  public void tearDown() throws Exception {
    shutdownCluster();
    super.tearDown();
  }

  @Test
  public void testSolrJAPICalls() throws Exception {

    final CloudSolrClient client = cluster.getSolrClient();

    RequestStatusState state =
        CollectionAdminRequest.createCollection("testasynccollectioncreation", "conf1", 1, 1)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);

    state =
        CollectionAdminRequest.createCollection("testasynccollectioncreation", "conf1", 1, 1)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame(
        "Recreating a collection with the same should have failed.",
        RequestStatusState.FAILED,
        state);

    state =
        CollectionAdminRequest.addReplicaToShard("testasynccollectioncreation", "shard1")
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("Add replica did not complete", RequestStatusState.COMPLETED, state);

    state =
        CollectionAdminRequest.splitShard("testasynccollectioncreation")
            .setShardName("shard1")
            .processAndWait(client, MAX_TIMEOUT_SECONDS * 2);
    assertEquals(
        "Shard split did not complete. Last recorded state: " + state,
        RequestStatusState.COMPLETED,
        state);
  }

  @Test
  public void testAsyncRequests() throws Exception {
    final String collection = "testAsyncOperations";
    final CloudSolrClient client = cluster.getSolrClient();

    RequestStatusState state =
        CollectionAdminRequest.createCollection(collection, "conf1", 1, 1)
            .setRouterName("implicit")
            .setShards("shard1")
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);

    cluster.waitForActiveCollection(collection, 1, 1);

    // Add a few documents to shard1
    int numDocs = TestUtil.nextInt(random(), 10, 100);
    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
    for (int i = 0; i < numDocs; i++) {
      SolrInputDocument doc = new SolrInputDocument();
      doc.addField("id", i);
      doc.addField("_route_", "shard1");
      docs.add(doc);
    }
    client.add(collection, docs);
    client.commit(collection);

    SolrQuery query = new SolrQuery("*:*");
    query.set("shards", "shard1");
    assertEquals(numDocs, client.query(collection, query).getResults().getNumFound());

    state =
        CollectionAdminRequest.reloadCollection(collection)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("ReloadCollection did not complete", RequestStatusState.COMPLETED, state);

    state =
        CollectionAdminRequest.createShard(collection, "shard2")
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("CreateShard did not complete", RequestStatusState.COMPLETED, state);

    cluster.getZkStateReader().forceUpdateCollection(collection);

    // Add a doc to shard2 to make sure shard2 was created properly
    SolrInputDocument doc = new SolrInputDocument();
    doc.addField("id", numDocs + 1);
    doc.addField("_route_", "shard2");
    client.add(collection, doc);
    client.commit(collection);
    query = new SolrQuery("*:*");
    query.set("shards", "shard2");
    assertEquals(1, client.query(collection, query).getResults().getNumFound());

    state =
        CollectionAdminRequest.deleteShard(collection, "shard2")
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("DeleteShard did not complete", RequestStatusState.COMPLETED, state);

    state =
        CollectionAdminRequest.addReplicaToShard(collection, "shard1")
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("AddReplica did not complete", RequestStatusState.COMPLETED, state);

    // cloudClient watch might take a couple of seconds to reflect it
    waitForState(
        "Wait for replica in cluster state",
        collection,
        c -> {
          if (c == null) return false;
          Slice slice = c.getSlice("shard1");
          if (slice == null) {
            return false;
          }

          if (slice.getReplicas().size() == 2) {
            return true;
          }

          return false;
        });

    state =
        CollectionAdminRequest.createAlias("myalias", collection)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("CreateAlias did not complete", RequestStatusState.COMPLETED, state);

    query = new SolrQuery("*:*");
    query.set("shards", "shard1");
    assertEquals(numDocs, client.query("myalias", query).getResults().getNumFound());

    state =
        CollectionAdminRequest.deleteAlias("myalias").processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("DeleteAlias did not complete", RequestStatusState.COMPLETED, state);

    try {
      client.query("myalias", query);
      fail("Alias should not exist");
    } catch (SolrException e) {
      // expected
    }

    Slice shard1 =
        cluster.getSolrClient().getClusterState().getCollection(collection).getSlice("shard1");
    Replica replica = shard1.getReplicas().iterator().next();
    for (String liveNode : cluster.getSolrClient().getClusterState().getLiveNodes()) {
      if (!replica.getNodeName().equals(liveNode)) {
        state =
            new CollectionAdminRequest.MoveReplica(collection, replica.getName(), liveNode)
                .processAndWait(client, MAX_TIMEOUT_SECONDS);
        assertSame("MoveReplica did not complete", RequestStatusState.COMPLETED, state);
        break;
      }
    }
    cluster.getZkStateReader().forceUpdateCollection(collection);

    shard1 = cluster.getSolrClient().getClusterState().getCollection(collection).getSlice("shard1");
    String replicaName = shard1.getReplicas().iterator().next().getName();
    state =
        CollectionAdminRequest.deleteReplica(collection, "shard1", replicaName)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);

    state =
        CollectionAdminRequest.deleteCollection(collection)
            .processAndWait(client, MAX_TIMEOUT_SECONDS);
    assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
  }

  public void testAsyncIdRaceCondition() throws Exception {

    SolrClient[] clients = new SolrClient[cluster.getJettySolrRunners().size()];
    int j = 0;
    for (JettySolrRunner r : cluster.getJettySolrRunners()) {
      clients[j++] = new HttpSolrClient.Builder(r.getBaseUrl().toString()).build();
    }
    RequestStatusState state =
        CollectionAdminRequest.createCollection("testAsyncIdRaceCondition", "conf1", 1, 1)
            .setRouterName("implicit")
            .setShards("shard1")
            .processAndWait(cluster.getSolrClient(), MAX_TIMEOUT_SECONDS);
    assertSame("CreateCollection task did not complete!", RequestStatusState.COMPLETED, state);

    int numThreads = 10;
    final AtomicInteger numSuccess = new AtomicInteger(0);
    final AtomicInteger numFailure = new AtomicInteger(0);
    final CountDownLatch latch = new CountDownLatch(numThreads);

    ExecutorService es =
        ExecutorUtil.newMDCAwareFixedThreadPool(
            numThreads, new SolrNamedThreadFactory("testAsyncIdRaceCondition"));
    try {
      for (int i = 0; i < numThreads; i++) {
        es.execute(
            new Runnable() {

              @Override
              public void run() {
                CollectionAdminRequest.Reload reloadCollectionRequest =
                    CollectionAdminRequest.reloadCollection("testAsyncIdRaceCondition");
                latch.countDown();
                try {
                  latch.await();
                } catch (InterruptedException e) {
                  throw new RuntimeException();
                }

                try {
                  if (log.isInfoEnabled()) {
                    log.info("{} - Reloading Collection.", Thread.currentThread().getName());
                  }
                  reloadCollectionRequest.processAsync(
                      "repeatedId", clients[random().nextInt(clients.length)]);
                  numSuccess.incrementAndGet();
                } catch (SolrServerException | RemoteSolrException e) {
                  if (log.isInfoEnabled()) {
                    log.info("Exception during collection reloading, we were waiting for one: ", e);
                  }
                  assertThat(
                      e.getMessage(),
                      containsString("Task with the same requestid already exists. (repeatedId)"));
                  numFailure.incrementAndGet();
                } catch (IOException e) {
                  throw new RuntimeException();
                }
              }
            });
      }
      es.shutdown();
      assertTrue(es.awaitTermination(10, TimeUnit.SECONDS));
      assertEquals(1, numSuccess.get());
      assertEquals(numThreads - 1, numFailure.get());
    } finally {
      for (SolrClient client : clients) {
        client.close();
      }
    }
  }
}
